Normalmente, este paso no lo hacemos en nuestra sesión de análisis: los datos están distribuidos en un cluster originalmente. Para nuestro ejemplo, limpiamos y cargamos los datos en memoria:
library(tidyverse)
Registered S3 method overwritten by 'dplyr':
method from
print.rowwise_df
Registered S3 methods overwritten by 'dbplyr':
method from
print.tbl_lazy
print.tbl_sql
[30m── [1mAttaching packages[22m ─────────────────────────────────────────────────────── tidyverse 1.3.0 ──[39m
[30m[32m✔[30m [34mggplot2[30m 3.2.1 [32m✔[30m [34mpurrr [30m 0.3.3
[32m✔[30m [34mtibble [30m 2.1.3 [32m✔[30m [34mdplyr [30m 0.8.3
[32m✔[30m [34mtidyr [30m 1.0.0 [32m✔[30m [34mstringr[30m 1.4.0
[32m✔[30m [34mreadr [30m 1.3.1 [32m✔[30m [34mforcats[30m 0.4.0[39m
[30m── [1mConflicts[22m ────────────────────────────────────────────────────────── tidyverse_conflicts() ──
[31m✖[30m [34mdplyr[30m::[32mfilter()[30m masks [34mstats[30m::filter()
[31m✖[30m [34mdplyr[30m::[32mlag()[30m masks [34mstats[30m::lag()[39m
limpiar <- function(lineas,...){
df_lista <- str_split(lineas, " ") %>%
keep(function(x) x[1] != '#') %>%
transpose %>%
map(function(col) as.character(col))
df <- data_frame(articulo = df_lista[[1]],
categorias = df_lista[[2]])
df
}
filtrado <- read_lines_chunked("../../datos/similitud/wiki-100000.txt",
skip = 1, callback = ListCallback$new(limpiar))
`data_frame()` is deprecated, use `tibble()`.
[90mThis warning is displayed once per session.[39m
Los datos completos están aquí
Consideramos los datos ya tokenizados (los tokens son las categorías):
articulos_df <- filtrado %>% bind_rows
articulos_df
Y registramos en el cluster (en este caso, corremos los scripts localmente):
library(sparklyr)
Registered S3 methods overwritten by 'htmltools':
method from
print.html tools:rstudio
print.shiny.tag tools:rstudio
print.shiny.tag.list tools:rstudio
Registered S3 method overwritten by 'htmlwidgets':
method from
print.htmlwidget tools:rstudio
Attaching package: ‘sparklyr’
The following object is masked from ‘package:purrr’:
invoke
config <- spark_config()
# configuración para modo local:
config$`sparklyr.shell.driver-memory` <- "2G" # para poder hacer collect de pares más adelante
sc <- spark_connect(master = "local", config = config)
* Using Spark: 2.4.0
# normalmente no copiamos de nuestra sesión de R a un cluster! Para este ejemplo
# con datos chicos es posible:
articulos_wiki_tbl <- copy_to(sc, articulos_df, "articulos_wiki", overwrite = TRUE)
articulos_wiki_tbl
Agrupamos los tokens en una lista:
art_agr <- articulos_wiki_tbl %>%
group_by(articulo) %>%
summarise(lista = collect_list(categorias))
Y binarizamos (la representación para usar la implementación de spark es de matriz rala: 1 cuando el token/shingle pertenece al documento, y 0 si no):
art_bin <- art_agr %>%
ft_count_vectorizer('lista', 'vector', binary = TRUE)
# estimator
lsh_wiki_estimator <- ft_minhash_lsh(sc, 'vector', 'hashes',
seed = 1227,
num_hash_tables = 5)
lsh_wiki_trans <- ml_fit(lsh_wiki_estimator, art_bin)
art_bin <- ml_transform(lsh_wiki_trans, art_bin)
art_bin %>% head(5)
Podemos encontrar vecinos cercanos
vec_1 <- art_bin %>% filter(articulo =='Alabama') %>% pull(vector)
similares <- ml_approx_nearest_neighbors(lsh_wiki_trans,
art_bin, vec_1[[1]], num_nearest_neighbors = 10) %>%
select(articulo, lista, distCol)
print(similares %>% collect)
Encontramos pares similares con un similarity join, por ejemplo:
art_bin <- art_bin %>% mutate(id = articulo)
pares_candidatos <- ml_approx_similarity_join(lsh_wiki_trans, art_bin, art_bin, 0.7,
dist_col = "distCol") %>% filter(id_a != id_b)
pares_candidatos %>% tally()
pares <- pares_candidatos %>% filter(distCol < 0.2)
pares %>% tally
pares <- pares %>% collect()
Por ejemplo
DT::datatable(pares %>% filter(str_detect(id_a, "poker") | str_detect(id_b, "poker")))
Nota: la implementación en spark de LSH utiliza solamente amplificación OR. Es posible usar suficientes hashes para obtener pares, y después filtrar los de la distancia que buscamos (¿Cómo implementar familias AND-OR)?